package org.rocketmq.template;

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.rocketmq.utils.MessagesSplitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @author zd
 * @date 2023/2/2 11:39
 * @description 消息发送封装
 * 方法含义：同步发送send, 异步发送sendAsync, 单向发送sendOneway
 */
public class RocketMQTemplate {
    private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

    private DefaultMQProducer defaultMQProducer;
    private TransactionMQProducer transactionMQProducer;

    public void setDefaultMQProducer(DefaultMQProducer defaultMQProducer) {
        this.defaultMQProducer = defaultMQProducer;
    }

    public void setTransactionMQProducer(TransactionMQProducer transactionMQProducer) {
        this.transactionMQProducer = transactionMQProducer;
    }

    public SendResult send(String topic, String msg, long timeout) throws Exception {
        return this.send(topic, "", msg, timeout);
    }

    public SendResult send(String topic, String tags, String msg, long timeout) throws Exception {
        return this.send(topic, tags, "", msg, 0, timeout);
    }

    public SendResult send(String topic, String tags, String keys, String msg, int delayTimeLevel, long timeout) throws Exception {
        return this.sendMsg(topic, tags, keys, msg, null, false, delayTimeLevel, timeout);
    }

    public void sendAsync(String topic, String msg, SendCallback sendCallback, long timeout) throws Exception {
        this.sendAsync(topic, "", msg, sendCallback, timeout);
    }

    public void sendAsync(String topic, String tags, String msg, SendCallback sendCallback, long timeout) throws Exception {
        this.sendAsync(topic, tags, "", msg, sendCallback, 0, timeout);
    }

    public void sendAsync(String topic, String tags, String keys, String msg, SendCallback sendCallback, int delayTimeLevel, long timeout) throws Exception {
        this.sendMsg(topic, tags, keys, msg, sendCallback, false, delayTimeLevel, timeout);
    }

    public void sendOneway(String topic, String msg, long timeout) throws Exception {
        this.sendOneway(topic, "", msg, timeout);
    }

    public void sendOneway(String topic, String tags, String msg, long timeout) throws Exception {
        this.sendOneway(topic, tags, "", msg, 0, timeout);
    }

    public void sendOneway(String topic, String tags, String keys, String msg, int delayTimeLevel, long timeout) throws Exception {
        this.sendMsg(topic, tags, keys, msg, null, true, delayTimeLevel, timeout);
    }

    public SendResult send(String topic, String tags, String msg, MessageQueueSelector selector, Object arg) throws Exception {
        return this.sendMsg(topic, tags, "", msg, null, false, 0, 0, selector, arg);
    }

    /**
     * 批量消息，注意最大4MB
     */
    public void sendBatch(List<Message> msgs) throws Exception {
        MessagesSplitter messagesSplitter = new MessagesSplitter(msgs);
        while (messagesSplitter.hasNext()) {
            List<Message> subMessageList = messagesSplitter.next();
            SendResult sendResult = this.defaultMQProducer.send(subMessageList);
            log.info("消息发送状态: {}, size,{}", sendResult, sendResult.getMsgId().split(",").length);
        }
    }

    private Message createMessage(String topic, String tags, String keys, String msg, SendCallback sendCallback, boolean isOneway,
                                  int delayTimeLevel, long timeout) {
        Message message = new Message(topic, tags, keys, msg.getBytes(StandardCharsets.UTF_8));
        message.setDelayTimeLevel(delayTimeLevel);
        if (timeout > 0) {
            message.setDeliverTimeMs(timeout);
        }
        return message;
    }

    /**
     * 发送普通消息
     *
     * @param topic          主题
     * @param tags           过滤标签Tag
     * @param keys           索引Key列表
     * @param sendCallback   异步回调
     * @param isOneway       是否单向发送
     * @param delayTimeLevel 延时消息等级，大于0才起作用，级别： 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * @param timeout        定时时间戳，单位毫秒
     */
    private SendResult sendMsg(String topic, String tags, String keys, String msg, SendCallback sendCallback, boolean isOneway,
                               int delayTimeLevel, long timeout) throws Exception {
        Message message = createMessage(topic, tags, keys, msg, sendCallback, isOneway, delayTimeLevel, timeout);
        log.info("监听到消息发送, topic={}, tags={}, keys={}, msg={}, isOneway={}, delayTimeLevel={}, timeout={}",
                topic, tags, keys, msg, isOneway, delayTimeLevel, timeout);
        if (isOneway) {
            // 单向发送,由于在oneway方式发送消息时没有请求应答处理，如果出现消息发送失败，则会因为没有重试而导致数据丢失。若数据不可丢，建议选用可靠同步或可靠异步发送方式。
            this.defaultMQProducer.sendOneway(message);
        } else {
            if (sendCallback == null) {
                // 同步发送消息，只要不抛异常就是成功。
                return this.defaultMQProducer.send(message);
            } else {
                // 异步发送
                this.defaultMQProducer.send(message, sendCallback);
            }
        }
        return null;
    }

    /**
     * @param selector 消息队列选择器，根据业务唯一标识自定义队列选择算法
     * @param arg      选择队列的业务标识
     */
    private SendResult sendMsg(String topic, String tags, String keys, String msg, SendCallback sendCallback, boolean isOneway,
                               int delayTimeLevel, long timeout, MessageQueueSelector selector, Object arg) throws Exception {
        Message message = createMessage(topic, tags, keys, msg, sendCallback, isOneway, delayTimeLevel, timeout);
        log.info("监听到消息发送, message={}", message);
        return this.defaultMQProducer.send(message, selector, arg);
    }


    /**
     * 发送事务消息
     */
    public TransactionSendResult sendTransactionMsg(Message message) throws Exception {
        return this.transactionMQProducer.sendMessageInTransaction(message, null);
    }

}
